-
Couldn't load subscription status.
- Fork 1.8k
out_doris: add new doris out plugin #9514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Does this need additional dependencies? |
This component has no new additional dependencies. |
|
It seems that msvc does not have |
Yes, you have to support all legacy targets I'm afraid so the various vendored libraries have atomic support in place. I'm not sure if there is a general way in Fluent Bit to do it @cosmo0920 ? |
We need to use InterlockedAdd in Windows for the equivalent operation of __sync_fetch_and_add. |
Thank you both! |
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
Signed-off-by: composer <[email protected]>
|
Any timeline for merging this pr to the main line. |
|
|
||
| /* Append headers */ | ||
| flb_http_add_header(c, "format", 6, "json", 4); | ||
| flb_http_add_header(c, "Expect", 6, "100-continue", 12); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This 100-continue return status code 100 will cause retransmission, can it be removed? If needed, can it be added manually?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is necessary for the Doris http stram load API. Adding by the user themselves will increase the complexity of the user configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of the "100-continue" header, in the group commit mode, fluent-bit retry sends data, and if doris is a "Duplicate Key Model" the logs will produce duplicates.
This kind is not friendly.
It can also provide parameters to control whether to enable 100-continue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure it is caused by 100-continue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm sure
|
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days. |
|
@joker-star-l are you able to resolve the conflicts to move this forward? |
add: support json lines
WalkthroughAdds a new Apache Doris output plugin (out_doris) and its CMake/test registrations, plus implementation and configuration code including HTTP delivery, redirect/retry handling, payload composition, and an optional progress reporter thread. Changes
Sequence Diagram(s)sequenceDiagram
participant FB as Fluent Bit
participant Plugin as out_doris plugin
participant Conf as flb_doris_conf_create/destroy
participant Reporter as Progress Reporter
participant Compose as compose_payload
participant HTTP as HTTP client / upstream
participant Doris as Doris stream-load
FB->>Plugin: cb_doris_init
Plugin->>Conf: flb_doris_conf_create (parse options, create upstream)
Conf->>Reporter: spawn reporter thread (if enabled)
FB->>Plugin: cb_doris_flush (records)
Plugin->>Compose: build payload (format, time/date)
Compose->>HTTP: send PUT (auth, headers, label)
HTTP->>Doris: stream-load request
alt 200 OK
Doris-->>HTTP: JSON result (Status...)
HTTP->>Plugin: parse -> OK or RETRY
Plugin->>Reporter: update counters
else 307 Redirect
Doris-->>HTTP: Location header
HTTP->>HTTP: extract new endpoint
HTTP->>Doris: retry PUT (new endpoint)
else Error
HTTP-->>Plugin: RETRY
end
FB->>Plugin: cb_doris_exit
Plugin->>Conf: flb_doris_conf_destroy (cancel reporter, cleanup upstream)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60–75 minutes
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
♻️ Duplicate comments (1)
plugins/out_doris/doris.c (1)
144-153: Make “Expect: 100-continue” optional to avoid duplicate inserts on retransmit.Always setting Expect: 100-continue can trigger retransmits and duplicates with certain Doris models. Add a boolean config (default false) to control it.
- flb_http_add_header(c, "Expect", 6, "100-continue", 12); + if (ctx->ins && flb_config_prop_get("expect_100_continue", &ctx->ins->properties)) { + flb_http_add_header(c, "Expect", 6, "100-continue", 12); + }And extend the config map:
// log_progress_interval { FLB_CONFIG_MAP_INT, "log_progress_interval", "10", 0, FLB_TRUE, offsetof(struct flb_out_doris, log_progress_interval), "Specify the interval in seconds to log the progress of the doris stream load" }, + // expect_100_continue + { + FLB_CONFIG_MAP_BOOL, "expect_100_continue", "false", + 0, FLB_FALSE, 0, + "Send Expect: 100-continue header (may cause retransmit/duplicates on some Doris setups)" + },Follow-up: store this as an int flag in ctx if you prefer avoiding property lookups on hot path.
Also applies to: 417-424
🧹 Nitpick comments (7)
cmake/windows-setup.cmake (1)
7-7: Nit: fix message typo.“setttings” → “settings”.
tests/runtime/CMakeLists.txt (1)
128-129: Skip Doris runtime test on Windows.Mirror plugin gating to avoid Windows CI failures until port is ready.
Apply:
- FLB_RT_TEST(FLB_OUT_DORIS "out_doris.c") + if(NOT FLB_SYSTEM_WINDOWS) + FLB_RT_TEST(FLB_OUT_DORIS "out_doris.c") + endif()tests/runtime/out_doris.c (2)
74-93: Deduplicate NULL checks; avoid redundant destroy paths.res_data is checked twice; also two early-return branches call flb_sds_destroy(out_line). Collapse to one guard to reduce risk of double-free on future edits.
- if (!TEST_CHECK(res_data != NULL)) { - TEST_MSG("res_data is NULL"); - return; - } ... - if (!TEST_CHECK(res_data != NULL)) { - TEST_MSG("res_data is NULL"); - flb_sds_destroy(out_line); - return; - } + if (!TEST_CHECK(res_data != NULL)) { + TEST_MSG("res_data is NULL"); + return; + }
142-146: Use flb_time_msleep for portability.sleep(1) is non-portable on Windows. Use the project helper to sleep 1000 ms before shutdown.
- sleep(1); + flb_time_msleep(1000);plugins/out_doris/doris.h (1)
34-35: Avoid fixed-size URI buffer; prefer sds to handle long db/table names.char uri[256] risks truncation with long identifiers. Consider flb_sds_t with checked growth.
- char uri[256]; + flb_sds_t uri;Follow-up: allocate in create, free in destroy.
plugins/out_doris/doris.c (2)
23-28: Remove duplicate include.flb_pack.h is included twice.
-#include <fluent-bit/flb_pack.h> ... -#include <fluent-bit/flb_pack.h>
144-151: Align Doris headers with selected output format.You always set format: json. For json_lines, Doris typically requires an additional header (e.g., read_json_by_line: true). Set headers consistent with ctx->out_format; log the mode.
- flb_http_add_header(c, "format", 6, "json", 4); + flb_http_add_header(c, "format", 6, "json", 4); + if (ctx->out_format == FLB_PACK_JSON_FORMAT_LINES) { + flb_http_add_header(c, "read_json_by_line", 17, "true", 4); + }And ensure compose_payload uses the same ctx->out_format (already done).
Also applies to: 277-305, 393-401
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
cmake/plugins_options.cmake(1 hunks)cmake/windows-setup.cmake(1 hunks)plugins/CMakeLists.txt(1 hunks)plugins/out_doris/CMakeLists.txt(1 hunks)plugins/out_doris/doris.c(1 hunks)plugins/out_doris/doris.h(1 hunks)plugins/out_doris/doris_conf.c(1 hunks)plugins/out_doris/doris_conf.h(1 hunks)tests/runtime/CMakeLists.txt(1 hunks)tests/runtime/out_doris.c(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
plugins/out_doris/doris_conf.h (1)
plugins/out_doris/doris_conf.c (2)
flb_doris_conf_create(73-202)flb_doris_conf_destroy(204-220)
tests/runtime/out_doris.c (4)
lib/monkey/mk_core/external/winpthreads.c (2)
pthread_mutex_lock(168-172)pthread_mutex_unlock(174-178)src/flb_sds.c (1)
flb_sds_destroy(461-471)src/flb_lib.c (8)
flb_service_set(585-611)flb_input(261-271)flb_output(274-284)flb_destroy(223-258)flb_output_set(453-484)flb_output_set_test(517-548)flb_start(753-764)flb_lib_push(642-663)src/flb_time.c (1)
flb_time_msleep(76-87)
plugins/out_doris/doris_conf.c (7)
lib/monkey/mk_core/external/winpthreads.c (4)
pthread_detach(736-750)pthread_self(322-360)pthread_create(676-717)pthread_cancel(484-526)include/fluent-bit/flb_mem.h (2)
flb_calloc(84-96)flb_free(126-128)include/fluent-bit/flb_output.h (1)
flb_output_config_map_set(1068-1091)src/flb_output.c (3)
flb_output_net_default(957-967)flb_output_get_property(994-997)flb_output_upstream_set(1382-1444)src/flb_upstream.c (2)
flb_upstream_create(258-331)flb_upstream_destroy(610-645)src/flb_utils.c (1)
flb_utils_bool(668-682)src/flb_pack.c (1)
flb_pack_to_json_format_type(838-854)
plugins/out_doris/doris.c (7)
plugins/out_doris/doris_conf.c (2)
flb_doris_conf_create(73-202)flb_doris_conf_destroy(204-220)src/flb_output.c (2)
flb_output_set_context(1350-1353)flb_output_set_http_debug_callbacks(1464-1471)src/flb_upstream.c (4)
flb_upstream_create(258-331)flb_upstream_conn_get(657-782)flb_upstream_conn_release(800-870)flb_upstream_destroy(610-645)src/flb_http_client.c (5)
flb_http_client(636-792)flb_http_add_header(896-928)flb_http_basic_auth(1137-1141)flb_http_do(1361-1406)flb_http_client_destroy(1462-1469)include/fluent-bit/flb_sds.h (1)
flb_sds_len(48-51)src/flb_pack.c (2)
flb_pack_json(290-296)flb_pack_msgpack_to_json_format(915-1138)src/flb_sds.c (1)
flb_sds_destroy(461-471)
🔇 Additional comments (2)
plugins/out_doris/CMakeLists.txt (1)
1-6: LGTM.Source aggregation and FLB_PLUGIN wiring look correct.
cmake/plugins_options.cmake (1)
114-114: LGTM.Option naming and default align with existing outputs.
| set(FLB_OUT_COUNTER Yes) | ||
| set(FLB_OUT_CHRONICLE Yes) | ||
| set(FLB_OUT_DATADOG Yes) | ||
| set(FLB_OUT_DORIS Yes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Disable Doris by default on Windows until MSVC-safe.
Plugin code uses pthread APIs; enabling by default risks Windows/MSVC build failures. Recommend turning it off in Windows defaults until a Windows threading path is provided.
Apply:
- set(FLB_OUT_DORIS Yes)
+ set(FLB_OUT_DORIS No)🤖 Prompt for AI Agents
In cmake/windows-setup.cmake around line 69, the default enables Doris on
Windows which will cause MSVC build failures because the plugin uses pthread
APIs; change the default from Yes to No for FLB_OUT_DORIS and add a brief inline
comment indicating Doris is disabled on Windows until an MSVC-safe threading
implementation is provided.
| REGISTER_OUT_PLUGIN("out_s3") | ||
| REGISTER_OUT_PLUGIN("out_vivo_exporter") | ||
| REGISTER_OUT_PLUGIN("out_chronicle") | ||
| REGISTER_OUT_PLUGIN("out_doris") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Gate Doris plugin registration on non-Windows platforms.
Until Windows support lands, guard like out_plot to prevent MSVC builds from pulling the target.
Apply:
-REGISTER_OUT_PLUGIN("out_doris")
+if (NOT CMAKE_SYSTEM_NAME MATCHES "Windows")
+ REGISTER_OUT_PLUGIN("out_doris")
+endif()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| REGISTER_OUT_PLUGIN("out_doris") | |
| if (NOT CMAKE_SYSTEM_NAME MATCHES "Windows") | |
| REGISTER_OUT_PLUGIN("out_doris") | |
| endif() |
🤖 Prompt for AI Agents
In plugins/CMakeLists.txt around line 347, the REGISTER_OUT_PLUGIN("out_doris")
call must be gated so Windows/MSVC builds don't pull the target; wrap the
registration in a CMake conditional (e.g., if(NOT MSVC) or if(NOT WIN32))
mirroring the out_plot guard and close it with endif() so the plugin is only
registered on non-Windows platforms.
| void *report(void *c) { | ||
| struct flb_out_doris *ctx = (struct flb_out_doris *) c; | ||
|
|
||
| size_t init_time = cfl_time_now() / 1000000000L; | ||
| size_t last_time = init_time; | ||
| size_t last_bytes = ctx->reporter->total_bytes; | ||
| size_t last_rows = ctx->reporter->total_rows; | ||
|
|
||
| size_t cur_time, cur_bytes, cur_rows, total_time, total_speed_mbps, total_speed_rps; | ||
| size_t inc_bytes, inc_rows, inc_time, inc_speed_mbps, inc_speed_rps; | ||
|
|
||
| pthread_detach(pthread_self()); | ||
|
|
||
| flb_plg_info(ctx->ins, "Start progress reporter with interval %d", ctx->log_progress_interval); | ||
|
|
||
| while (ctx->log_progress_interval > 0) { | ||
| sleep(ctx->log_progress_interval); | ||
|
|
||
| cur_time = cfl_time_now() / 1000000000L; | ||
| cur_bytes = ctx->reporter->total_bytes; | ||
| cur_rows = ctx->reporter->total_rows; | ||
| total_time = cur_time - init_time; | ||
| total_speed_mbps = cur_bytes / 1024 / 1024 / total_time; | ||
| total_speed_rps = cur_rows / total_time; | ||
|
|
||
| inc_bytes = cur_bytes - last_bytes; | ||
| inc_rows = cur_rows - last_rows; | ||
| inc_time = cur_time - last_time; | ||
| inc_speed_mbps = inc_bytes / 1024 / 1024 / inc_time; | ||
| inc_speed_rps = inc_rows / inc_time; | ||
|
|
||
| flb_plg_info(ctx->ins, "total %zu MB %zu ROWS, total speed %zu MB/s %zu R/s, last %zu seconds speed %zu MB/s %zu R/s", | ||
| cur_bytes/1024/1024, cur_rows, total_speed_mbps, total_speed_rps, | ||
| inc_time, inc_speed_mbps, inc_speed_rps); | ||
|
|
||
| last_time = cur_time; | ||
| last_bytes = cur_bytes; | ||
| last_rows = cur_rows; | ||
| } | ||
|
|
||
| return NULL; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reporter thread may access freed memory; detach + cancel causes UAF risk. Make it joinable and stop cleanly.
The report() thread dereferences ctx and ctx->reporter inside a loop. conf_destroy() cancels the detached thread and frees reporter/ctx immediately, risking UAF. Prefer a joinable thread and a stop signal, then join before freeing.
-void *report(void *c) {
+void *report(void *c) {
struct flb_out_doris *ctx = (struct flb_out_doris *) c;
@@
- pthread_detach(pthread_self());
+ /* joinable thread; shutdown will signal via log_progress_interval = 0 */
@@
- while (ctx->log_progress_interval > 0) {
- sleep(ctx->log_progress_interval);
+ while (ctx->log_progress_interval > 0) {
+ flb_time_msleep((uint32_t) (ctx->log_progress_interval * 1000));
@@
- last_time = cur_time;
+ last_time = cur_time;
last_bytes = cur_bytes;
last_rows = cur_rows;
}
@@
- if (ctx->log_progress_interval > 0) {
- reporter = flb_calloc(1, sizeof(struct flb_doris_progress_reporter));
+ if (ctx->log_progress_interval > 0) {
+ reporter = flb_calloc(1, sizeof(struct flb_doris_progress_reporter));
@@
- if(pthread_create(&ctx->reporter_thread, NULL, report, (void *) ctx)) {
+ if (pthread_create(&ctx->reporter_thread, NULL, report, (void *) ctx)) {
flb_plg_error(ins, "failed to create progress reporter");
flb_doris_conf_destroy(ctx);
return NULL;
}
}
@@
- if (ctx->reporter) {
- pthread_cancel(ctx->reporter_thread);
- flb_free(ctx->reporter);
- }
+ if (ctx->reporter) {
+ /* signal stop and wait for thread to exit safely */
+ ctx->log_progress_interval = 0;
+ pthread_join(ctx->reporter_thread, NULL);
+ flb_free(ctx->reporter);
+ }Additionally, include the time helper used above:
#include <fluent-bit/flb_utils.h>
+#include <fluent-bit/flb_time.h>Also applies to: 181-201, 214-219
| /* Validate */ | ||
| if (!ctx->user) { | ||
| flb_plg_error(ins, "user is not set"); | ||
| } | ||
| if (!ctx->database) { | ||
| flb_plg_error(ins, "database is not set"); | ||
| } | ||
| if (!ctx->table) { | ||
| flb_plg_error(ins, "table is not set"); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fail init when required params are missing (prevents NULL deref in snprintf).
Currently logs errors but proceeds; ctx->database/table may be NULL and are used below, leading to UB. Return NULL on missing required fields.
/* Validate */
if (!ctx->user) {
flb_plg_error(ins, "user is not set");
+ flb_free(ctx);
+ return NULL;
}
if (!ctx->database) {
flb_plg_error(ins, "database is not set");
+ flb_free(ctx);
+ return NULL;
}
if (!ctx->table) {
flb_plg_error(ins, "table is not set");
+ flb_free(ctx);
+ return NULL;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /* Validate */ | |
| if (!ctx->user) { | |
| flb_plg_error(ins, "user is not set"); | |
| } | |
| if (!ctx->database) { | |
| flb_plg_error(ins, "database is not set"); | |
| } | |
| if (!ctx->table) { | |
| flb_plg_error(ins, "table is not set"); | |
| } | |
| /* Validate */ | |
| if (!ctx->user) { | |
| flb_plg_error(ins, "user is not set"); | |
| flb_free(ctx); | |
| return NULL; | |
| } | |
| if (!ctx->database) { | |
| flb_plg_error(ins, "database is not set"); | |
| flb_free(ctx); | |
| return NULL; | |
| } | |
| if (!ctx->table) { | |
| flb_plg_error(ins, "table is not set"); | |
| flb_free(ctx); | |
| return NULL; | |
| } |
🤖 Prompt for AI Agents
In plugins/out_doris/doris_conf.c around lines 100 to 110, the config validation
only logs missing required fields (user, database, table) but continues, leading
to possible NULL dereferences later; modify the checks so that after logging
flb_plg_error for each missing required parameter the function returns NULL (or
an appropriate error indicator) immediately to abort init when any required
field is missing, ensuring no subsequent code uses ctx->database or ctx->table
when they are NULL.
| struct flb_out_doris *flb_doris_conf_create(struct flb_output_instance *ins, | ||
| struct flb_config *config); | ||
| void flb_doris_conf_destroy(struct flb_out_doris *ctx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Harden config validation and reporter lifecycle.
- Fail fast if user/database/table are unset to avoid NULL deref when building URI.
- Set ctx->reporter only after pthread_create succeeds; otherwise destroy() may cancel an uninitialized thread.
- In destroy(), stop/join the reporter before freeing upstream/context to avoid use-after-free. Prefer cooperative shutdown + pthread_join over pthread_cancel.
Minimal fixes in doris_conf.c (illustrative):
/* After validation */
if (!ctx->user || !ctx->database || !ctx->table) {
flb_plg_error(ins, "user/database/table must be set");
flb_doris_conf_destroy(ctx);
return NULL;
}
/* Start reporter */
if (ctx->log_progress_interval > 0) {
reporter = flb_calloc(1, sizeof(*reporter));
if (!reporter) { flb_plg_error(ins, "failed to create progress reporter"); flb_doris_conf_destroy(ctx); return NULL; }
reporter->total_bytes = reporter->total_rows = reporter->failed_rows = 0;
if (pthread_create(&ctx->reporter_thread, NULL, report, (void *) ctx) != 0) {
flb_plg_error(ins, "failed to start progress reporter");
flb_free(reporter);
flb_doris_conf_destroy(ctx);
return NULL;
}
ctx->reporter = reporter; /* set only after thread starts */
}
/* In flb_doris_conf_destroy() */
if (ctx->reporter) {
/* signal cooperative shutdown if available, then join */
pthread_cancel(ctx->reporter_thread);
pthread_join(ctx->reporter_thread, NULL);
flb_free(ctx->reporter);
}
/* destroy upstream after reporter stops */
if (ctx->u) {
flb_upstream_destroy(ctx->u);
}If Windows support is required, replace pthread usage with a Windows path (e.g., _beginthreadex or a Fluent Bit thread utility) under FLB_SYSTEM_WINDOWS.
Run to locate pthread usage that needs Windows-compatible handling:
🏁 Script executed:
#!/bin/bash
rg -n -C2 -g 'plugins/out_doris/**' -e 'pthread_create|pthread_cancel|pthread_join'Length of output: 689
---
🏁 Script executed:
```shell
#!/bin/bash
# Retrieve full flb_doris_conf_destroy function
rg -n -A15 'void flb_doris_conf_destroy' plugins/out_doris/doris_conf.c
Length of output: 352
🏁 Script executed:
#!/bin/bash
# Retrieve full flb_doris_conf_create function to check validation and reporter setup
rg -n -A30 'struct flb_out_doris \*flb_doris_conf_create' plugins/out_doris/doris_conf.c | head -60Length of output: 927
🏁 Script executed:
#!/bin/bash
# Check for user/database/table validation or NULL checks
rg -n -C3 -e 'user|database|table' plugins/out_doris/doris_conf.c | head -80Length of output: 663
🏁 Script executed:
#!/bin/bash
# Check for Windows threading guards in fluent-bit
fd -t f 'mk_thread|win32' --search-path . | head -20Length of output: 868
🏁 Script executed:
#!/bin/bash
# Search for Windows-compatible thread utilities in fluent-bit
rg -n -e 'FLB_SYSTEM_WINDOWS|_beginthreadex|mk_thread_create' --type=c | head -30Length of output: 1470
---
🏁 Script executed:
```shell
#!/bin/bash
# Check what the reporter thread accesses in ctx
rg -n -A20 'void \*report' plugins/out_doris/doris_conf.c
Length of output: 880
🏁 Script executed:
#!/bin/bash
# Check if fluent-bit provides thread wrapper macros for plugins
rg -n -e 'flb_thread|pthread_create' include/fluent-bit/flb_*.h | head -20Length of output: 460
Harden config validation and reporter lifecycle.
The code has three critical issues:
-
Validation fails silently (lines 101-109): Errors are logged but execution continues to line 139 where
snprintfwill NULL-deref ifdatabaseortableare unset. Must return early on validation failure. -
Reporter lifecycle error (line 192):
ctx->reporteris assigned beforepthread_createsucceeds at line 194. If creation fails,destroy()will cancel an invalid/uninitialized thread handle. -
Unsafe destroy order (lines 210-217): Upstream is freed at line 211, then reporter teardown happens at line 215. The reporter thread accesses
ctxandctx->reporter; freeing upstream first risks use-after-free. Additionally, nopthread_joinis used;pthread_cancelalone is unclean. Since the reporter is detached (line 41), add a shutdown flag or cooperative stop mechanism. -
No Windows support (lines 194, 215):
pthread_createandpthread_cancelare used unconditionally with noFLB_SYSTEM_WINDOWSguards. The codebase standard is to conditionally guard POSIX calls.
Minimal fixes in doris_conf.c:
/* In flb_doris_conf_create(), before upstream setup: */
if (!ctx->user || !ctx->database || !ctx->table) {
flb_plg_error(ins, "user, database, and table must be set");
flb_doris_conf_destroy(ctx);
return NULL;
}
/* Reporter setup: assign ctx->reporter only after pthread_create succeeds */
if (ctx->log_progress_interval > 0) {
reporter = flb_calloc(1, sizeof(*reporter));
if (!reporter) {
flb_plg_error(ins, "failed to allocate progress reporter");
flb_doris_conf_destroy(ctx);
return NULL;
}
reporter->total_bytes = reporter->total_rows = reporter->failed_rows = 0;
#ifdef FLB_SYSTEM_WINDOWS
/* Use Windows thread API or fluent-bit abstraction */
#else
if (pthread_create(&ctx->reporter_thread, NULL, report, (void *)ctx) != 0) {
flb_plg_error(ins, "failed to start progress reporter");
flb_free(reporter);
flb_doris_conf_destroy(ctx);
return NULL;
}
ctx->reporter = reporter; /* assign only after thread creation succeeds */
#endif
}
/* In flb_doris_conf_destroy(): stop reporter before freeing upstream */
if (ctx->reporter) {
#ifndef FLB_SYSTEM_WINDOWS
pthread_cancel(ctx->reporter_thread);
/* Optional: add cooperative shutdown flag to allow graceful exit */
#endif
flb_free(ctx->reporter);
}
if (ctx->u) {
flb_upstream_destroy(ctx->u);
}🤖 Prompt for AI Agents
In plugins/out_doris/doris_conf.c (validation at ~lines 101-109 and snprintf use
at ~139; reporter creation at ~192-194; destroy/order at ~210-217) and header
plugins/out_doris/doris_conf.h (prototypes at lines 28-30): ensure validation
failures return early (log error then flb_doris_conf_destroy(ctx); return NULL)
so database/table/user are never NULL-dereferenced; allocate reporter and start
the thread before assigning ctx->reporter (only set ctx->reporter after
pthread_create succeeds) and if thread start fails free reporter and clean up;
in flb_doris_conf_destroy stop the reporter cooperatively before freeing ctx or
upstream (use a shutdown flag the reporter polls, avoid freeing upstream until
reporter has exited, and prefer pthread_join or a polite shutdown over
pthread_cancel); finally, wrap pthread_create/pthread_cancel/pthread_join and
any POSIX-specific logic with #ifndef FLB_SYSTEM_WINDOWS guards and provide
Windows-path placeholders or use the project thread abstraction.
| u_conn = flb_upstream_conn_get(u); | ||
| if (!u_conn) { | ||
| flb_plg_error(ctx->ins, "no upstream connections available to %s:%i", | ||
| u->tcp_host, u->tcp_port); | ||
| return FLB_RETRY; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix leak on redirect upstream creation when connection acquisition fails.
On flb_upstream_conn_get failure, early return leaks the temporary upstream u. Destroy it if u != ctx->u.
u_conn = flb_upstream_conn_get(u);
if (!u_conn) {
flb_plg_error(ctx->ins, "no upstream connections available to %s:%i",
u->tcp_host, u->tcp_port);
- return FLB_RETRY;
+ if (u != ctx->u) {
+ flb_upstream_destroy(u);
+ }
+ return FLB_RETRY;
}
@@
- if (u != ctx->u) {
- flb_upstream_destroy(u);
- }
+ if (u != ctx->u) {
+ flb_upstream_destroy(u);
+ }Also applies to: 270-273
🤖 Prompt for AI Agents
In plugins/out_doris/doris.c around lines 121-127 (and similarly 270-273), when
flb_upstream_conn_get() fails the function currently returns early but leaks the
temporary upstream 'u'; if the upstream created for redirect is not the
module-level ctx->u you must call flb_upstream_destroy(u) (or the appropriate
upstream destroy API used in this codebase) before returning. Update both
failure paths to check if (u && u != ctx->u) then destroy the upstream, and then
return FLB_RETRY to avoid the leak.
| flb_config_map_foreach(head, mv, ctx->headers) { | ||
| key = mk_list_entry_first(mv->val.list, struct flb_slist_entry, _head); | ||
| val = mk_list_entry_last(mv->val.list, struct flb_slist_entry, _head); | ||
|
|
||
| flb_http_add_header(c, | ||
| key->str, flb_sds_len(key->str), | ||
| val->str, flb_sds_len(val->str)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use correct lengths for slist header entries (not flb_sds_len).
struct flb_slist_entry->str is a C string. Using flb_sds_len here is incorrect and may crash. Use the recorded entry length or strlen.
- flb_http_add_header(c,
- key->str, flb_sds_len(key->str),
- val->str, flb_sds_len(val->str));
+ flb_http_add_header(c,
+ key->str, strlen(key->str),
+ val->str, strlen(val->str));🤖 Prompt for AI Agents
In plugins/out_doris/doris.c around lines 154 to 161, the code calls flb_sds_len
on flb_slist_entry->str (a plain C string) which is incorrect and unsafe;
replace flb_sds_len(key->str) and flb_sds_len(val->str) with the slist entry's
recorded length (e.g., key->len and val->len) or, if that field is unavailable,
use strlen(key->str) and strlen(val->str) — prefer the recorded length for
performance and correctness — and pass those lengths to flb_http_add_header.
| ret = flb_http_do(c, &b_sent); | ||
| if (ret == 0) { | ||
| if (ctx->log_request) { | ||
| flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s\n", | ||
| host, port, | ||
| c->resp.status, c->resp.payload); | ||
| } else { | ||
| flb_plg_debug(ctx->ins, "%s:%i, HTTP status=%i\n%s\n", | ||
| host, port, | ||
| c->resp.status, c->resp.payload); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard JSON parse path; avoid using uninitialized buffers on flb_pack_json failure.
When flb_pack_json returns -1, out_buf/out_size are undefined but code continues to unpack. Bail out early.
- else if (c->resp.status == 200 && c->resp.payload_size > 0) {
- ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
- &out_buf, &out_size, &root_type, NULL);
-
- if (ret == -1) {
- out_ret = FLB_RETRY;
- }
-
- msgpack_unpacked_init(&result);
+ else if (c->resp.status == 200 && c->resp.payload_size > 0) {
+ ret = flb_pack_json(c->resp.payload, c->resp.payload_size,
+ &out_buf, &out_size, &root_type, NULL);
+ if (ret == -1) {
+ out_ret = FLB_RETRY;
+ goto parse_done;
+ }
+ msgpack_unpacked_init(&result);
ret = msgpack_unpack_next(&result, out_buf, out_size, &off);
if (ret != MSGPACK_UNPACK_SUCCESS) {
out_ret = FLB_RETRY;
- }
+ goto free_buf;
+ }
@@
- flb_free(out_buf);
- msgpack_unpacked_destroy(&result);
+free_buf:
+ flb_free(out_buf);
+ msgpack_unpacked_destroy(&result);
+parse_done:
}Also applies to: 192-242
🤖 Prompt for AI Agents
In plugins/out_doris/doris.c around lines 166 to 176 (and similarly for lines
192-242), the code calls flb_pack_json and proceeds to use out_buf/out_size even
when flb_pack_json returns -1 (undefined output); modify the control flow to
check flb_pack_json's return value immediately, and if it returns -1, log an
error, free any allocated resources (including the c connection, out_buf if set,
and any temporary buffers), set an appropriate error/return code, and skip the
unpack/JSON path to avoid using uninitialized buffers; apply the same guard and
cleanup logic to the other affected block (lines ~192-242).
| if (c->resp.status == 307) { // redict | ||
| // example: Location: http://admin:[email protected]:8040/api/d_fb/t_fb/_stream_load? | ||
| char* location = strstr(c->resp.data, "Location:"); | ||
| char* start = strstr(location, "@") + 1; | ||
| char* mid = strstr(start, ":"); | ||
| char* end = strstr(mid, "/api"); | ||
| char redict_host[1024] = {0}; | ||
| memcpy(redict_host, start, mid - start); | ||
| char redict_port[10] = {0}; | ||
| memcpy(redict_port, mid + 1, end - (mid + 1)); | ||
|
|
||
| out_ret = http_put(ctx, redict_host, atoi(redict_port), | ||
| body, body_len, tag, tag_len, label, label_len); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Harden redirect handling; don’t parse raw buffer; handle absent header/IPv6/URI.
Parsing Location from c->resp.data and splitting by '@', ':' and '/api' is brittle and unsafe (NULL deref if no header). Use the HTTP client header API and robust URI parse. Also bound redirect hops to prevent infinite loops.
- if (c->resp.status == 307) { // redict
- // example: Location: http://admin:[email protected]:8040/api/d_fb/t_fb/_stream_load?
- char* location = strstr(c->resp.data, "Location:");
- char* start = strstr(location, "@") + 1;
- char* mid = strstr(start, ":");
- char* end = strstr(mid, "/api");
- char redict_host[1024] = {0};
- memcpy(redict_host, start, mid - start);
- char redict_port[10] = {0};
- memcpy(redict_port, mid + 1, end - (mid + 1));
-
- out_ret = http_put(ctx, redict_host, atoi(redict_port),
- body, body_len, tag, tag_len, label, label_len);
- }
+ if (c->resp.status == 307) {
+ const char *loc = NULL; size_t loc_len = 0;
+ char hostbuf[256]; int portbuf = 0;
+ /* Prefer API (if available) to get header value */
+ if (flb_http_get_header(c, "Location", 8, &loc, &loc_len) == 0 && loc && loc_len > 0) {
+ /* Parse URI: scheme://[user[:pass]@]host[:port]/path ... */
+ if (flb_utils_url_split(loc, loc_len, NULL, NULL, hostbuf, sizeof(hostbuf), &portbuf, NULL, 0) == 0) {
+ out_ret = http_put(ctx, hostbuf, portbuf > 0 ? portbuf : port,
+ body, body_len, tag, tag_len, label, label_len);
+ }
+ else {
+ flb_plg_error(ctx->ins, "invalid Location header");
+ out_ret = FLB_RETRY;
+ }
+ }
+ else {
+ flb_plg_error(ctx->ins, "Location header missing on 307");
+ out_ret = FLB_RETRY;
+ }
+ }If flb_http_get_header/flb_utils_url_split are unavailable, fall back to a minimal parser with strict checks and add a max redirect counter.
🤖 Prompt for AI Agents
In plugins/out_doris/doris.c around lines 178 to 191, the redirect handling
currently parses c->resp.data with unsafe strstr/memcpy assumptions (NULL deref,
no IPv6/userinfo/URI handling) — replace this with robust logic: read the
Location header via the HTTP client header API (flb_http_get_header) or a proper
URL parser (flb_utils_url_split) if available; if those are not present
implement a guarded fallback that checks for NULLs at every strstr step,
supports optional userinfo (skip "user:pass@" if present) and IPv6 brackets
("[::1]"), bounds copies with size checks and snprintf/strn* to avoid overflow,
validate port presence and numeric range, and abort redirect processing if
parsing fails; also add and enforce a max redirect counter (e.g., 5) to prevent
infinite loops and return an error if exceeded.
| #ifndef FLB_OUT_DORIS_H | ||
| #define FLB_OUT_DORIS_H | ||
|
|
||
| #include <fluent-bit/flb_pthread.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Make header self-contained; include flb_sds.h and forward-declare used structs.
This header uses flb_sds_t and pointers to opaque structs without including their declarations. Add the minimal includes/forward-decls to prevent include-order fragility.
#include <fluent-bit/flb_pthread.h>
+/* required for flb_sds_t */
+#include <fluent-bit/flb_sds.h>
+/* forward declarations for opaque pointers */
+struct flb_upstream;
+struct flb_output_instance;
+struct mk_list;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #include <fluent-bit/flb_pthread.h> | |
| #include <fluent-bit/flb_pthread.h> | |
| /* required for flb_sds_t */ | |
| #include <fluent-bit/flb_sds.h> | |
| /* forward declarations for opaque pointers */ | |
| struct flb_upstream; | |
| struct flb_output_instance; | |
| struct mk_list; |
🤖 Prompt for AI Agents
In plugins/out_doris/doris.h around line 23, the header is not self-contained:
it uses flb_sds_t and pointers to opaque Fluent Bit structs without declaring
them; add #include <fluent-bit/flb_sds.h> and add forward declarations for any
opaque structs referenced in this header (for example: struct flb_config; struct
flb_input_instance; struct flb_output_instance; or whichever specific flb_*
struct pointer types are used) so the header can be included in any order
without require consumer translation units to include other Fluent Bit headers
first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (5)
plugins/out_doris/doris.c (5)
155-162: Use strlen instead of flb_sds_len for flb_slist_entry strings.
flb_slist_entry->stris a plain C string, not anflb_sds_t. Callingflb_sds_lenon it is incorrect and will likely crash when accessing non-existent metadata before the string pointer.Apply this diff to use
strlenfor plain C strings:flb_http_add_header(c, - key->str, flb_sds_len(key->str), - val->str, flb_sds_len(val->str)); + key->str, strlen(key->str), + val->str, strlen(val->str));
121-126: Fix upstream leak on connection acquisition failure.When
flb_upstream_conn_getfails (line 121-122), the function returns early at line 125 without destroying the temporary upstreamucreated for redirect (lines 115-120). This leaks the upstream resource.Apply this diff to destroy the temporary upstream before returning:
u_conn = flb_upstream_conn_get(u); if (!u_conn) { flb_plg_error(ctx->ins, "no upstream connections available to %s:%i", u->tcp_host, u->tcp_port); + if (u != ctx->u) { + flb_upstream_destroy(u); + } return FLB_RETRY; }
45-55: Fix MSVC atomics: use correct Interlocked API and include for size_t.The Windows atomic implementation has critical issues:
InterlockedAddexpectsvolatile LONG*, notsize_t*, causing type mismatch and potential undefined behavior on 64-bit.- Include
<windows.h>instead of<winnt.h>for proper Interlocked API availability.- Use
InterlockedAdd64for 64-bit andInterlockedAddfor 32-bit with appropriate casts.Apply this diff to fix the atomic implementation:
#ifdef FLB_SYSTEM_WINDOWS -#include <winnt.h> +#include <windows.h> #endif static inline void sync_fetch_and_add(size_t *dest, size_t value) { #ifdef FLB_SYSTEM_WINDOWS - InterlockedAdd(dest, value); + #if defined(_WIN64) + InterlockedAdd64((volatile LONG64 *)dest, (LONG64)value); + #else + InterlockedAdd((volatile LONG *)dest, (LONG)value); + #endif #else __sync_fetch_and_add(dest, value); #endif }
179-192: Harden redirect handling to prevent crashes and infinite loops.The redirect parsing is brittle and unsafe:
- Line 181:
strstrmay return NULL ifLocationheader is missing.- Lines 182-184: Subsequent
strstrcalls assume prior calls succeeded; no NULL checks lead to NULL pointer dereference.- No support for IPv6 addresses
[::1]:portor missing userinfo.- No bounds checking on
memcpyoperations (lines 186, 188).- Unbounded recursion allows infinite redirect loops.
Replace the manual parsing with robust logic:
- Use the HTTP client header API (if available, e.g.,
flb_http_get_header) or implement guarded fallback with NULL checks at every step.- Support optional userinfo and IPv6 brackets.
- Add and enforce a maximum redirect counter (e.g., 5) to prevent infinite loops.
- Validate extracted host/port before recursing.
Example guarded approach:
if (c->resp.status == 307) { - // example: Location: http://admin:[email protected]:8040/api/d_fb/t_fb/_stream_load? - char* location = strstr(c->resp.data, "Location:"); - char* start = strstr(location, "@") + 1; - char* mid = strstr(start, ":"); - char* end = strstr(mid, "/api"); - char redict_host[1024] = {0}; - memcpy(redict_host, start, mid - start); - char redict_port[10] = {0}; - memcpy(redict_port, mid + 1, end - (mid + 1)); - - out_ret = http_put(ctx, redict_host, atoi(redict_port), - body, body_len, tag, tag_len, label, label_len); + char* location = strstr(c->resp.data, "Location:"); + if (!location) { + flb_plg_error(ctx->ins, "Location header missing on 307"); + out_ret = FLB_RETRY; + } else { + /* TODO: Implement robust URI parser with IPv6/userinfo support and redirect limit */ + flb_plg_error(ctx->ins, "307 redirect parsing needs hardening"); + out_ret = FLB_RETRY; + } }Consider using a proper URL parser utility if available in the Fluent Bit codebase.
193-243: Guard against using uninitialized buffers when flb_pack_json fails.When
flb_pack_jsonreturns -1 (line 197),out_bufandout_sizeare undefined, but the code continues to use them:
- Line 201: Initializes
msgpack_unpacked resultunconditionally.- Line 202: Calls
msgpack_unpack_nextwith undefinedout_buf/out_size.- Lines 207-239: Parses invalid data.
- Line 241: Calls
flb_free(out_buf)on potentially uninitialized pointer.Apply this diff to bail out early when JSON parsing fails:
else if (c->resp.status == 200 && c->resp.payload_size > 0) { ret = flb_pack_json(c->resp.payload, c->resp.payload_size, &out_buf, &out_size, &root_type, NULL); - if (ret == -1) { + flb_plg_error(ctx->ins, "failed to parse response JSON"); out_ret = FLB_RETRY; + goto cleanup_http; } msgpack_unpacked_init(&result); ret = msgpack_unpack_next(&result, out_buf, out_size, &off); if (ret != MSGPACK_UNPACK_SUCCESS) { + flb_plg_error(ctx->ins, "failed to unpack response"); out_ret = FLB_RETRY; + flb_free(out_buf); + msgpack_unpacked_destroy(&result); + goto cleanup_http; } root = result.data; if (root.type != MSGPACK_OBJECT_MAP) { + flb_plg_error(ctx->ins, "response root is not a map"); out_ret = FLB_RETRY; + flb_free(out_buf); + msgpack_unpacked_destroy(&result); + goto cleanup_http; } for (i = 0; i < root.via.map.size; i++) { msg_key = root.via.map.ptr[i].key; if (msg_key.type != MSGPACK_OBJECT_STR) { out_ret = FLB_RETRY; break; } if (msg_key.via.str.size == 6 && strncmp(msg_key.via.str.ptr, "Status", 6) == 0) { msg_val = root.via.map.ptr[i].val; if (msg_val.type != MSGPACK_OBJECT_STR) { out_ret = FLB_RETRY; break; } if (msg_val.via.str.size == 7 && strncmp(msg_val.via.str.ptr, "Success", 7) == 0) { out_ret = FLB_OK; break; } if (msg_val.via.str.size == 15 && strncmp(msg_val.via.str.ptr, "Publish Timeout", 15) == 0) { out_ret = FLB_OK; break; } out_ret = FLB_RETRY; break; } } flb_free(out_buf); msgpack_unpacked_destroy(&result); } +cleanup_http:Note: Add the
cleanup_http:label before the existing cleanup section (line 254).
🧹 Nitpick comments (1)
plugins/out_doris/doris.c (1)
145-147: Consider adding a config option to control the Expect: 100-continue header.Line 147 unconditionally adds
Expect: 100-continue. A past reviewer (yunsur) noted that in group commit mode with Doris "Duplicate Key Model," this header causes Fluent Bit retry to produce duplicate logs, which is unfriendly to users.Consider adding a boolean configuration option (e.g.,
use_100_continue) to allow users to disable this header when needed:// In config_map (around line 421): { FLB_CONFIG_MAP_BOOL, "use_100_continue", "true", 0, FLB_TRUE, offsetof(struct flb_out_doris, use_100_continue), "Enable or disable Expect: 100-continue header" },Then conditionally add the header in http_put:
if (ctx->use_100_continue) { flb_http_add_header(c, "Expect", 6, "100-continue", 12); }Based on learnings
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
plugins/out_doris/doris.c(1 hunks)plugins/out_doris/doris_conf.c(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- plugins/out_doris/doris_conf.c
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-04T12:32:46.030Z
Learnt from: shadowshot-x
PR: fluent/fluent-bit#10825
File: plugins/out_s3/s3.c:0-0
Timestamp: 2025-09-04T12:32:46.030Z
Learning: In Fluent Bit plugins, avoid creating plugin-local config_map entries that duplicate core functionality. The core already provides Retry_Limit parsed into ins->retry_limit, so plugins should use that directly rather than defining their own retry_limit configuration option.
Applied to files:
plugins/out_doris/doris.c
📚 Learning: 2025-09-08T11:21:33.975Z
Learnt from: cosmo0920
PR: fluent/fluent-bit#10851
File: include/fluent-bit/flb_simd.h:60-66
Timestamp: 2025-09-08T11:21:33.975Z
Learning: Fluent Bit currently only supports MSVC compiler on Windows, so additional compiler compatibility guards may be unnecessary for Windows-specific code paths.
Applied to files:
plugins/out_doris/doris.c
🧬 Code graph analysis (1)
plugins/out_doris/doris.c (8)
plugins/out_doris/doris_conf.c (2)
flb_doris_conf_create(73-202)flb_doris_conf_destroy(204-220)src/flb_output.c (2)
flb_output_set_context(1350-1353)flb_output_set_http_debug_callbacks(1464-1471)src/flb_upstream.c (4)
flb_upstream_create(258-331)flb_upstream_conn_get(657-782)flb_upstream_conn_release(800-870)flb_upstream_destroy(610-645)src/flb_http_client.c (5)
flb_http_client(636-792)flb_http_add_header(896-928)flb_http_basic_auth(1137-1141)flb_http_do(1361-1406)flb_http_client_destroy(1462-1469)include/fluent-bit/flb_sds.h (1)
flb_sds_len(48-51)src/flb_pack.c (2)
flb_pack_json(290-296)flb_pack_msgpack_to_json_format(915-1138)include/fluent-bit/flb_mem.h (1)
flb_free(126-128)src/flb_sds.c (1)
flb_sds_destroy(461-471)
🔇 Additional comments (6)
plugins/out_doris/doris.c (6)
57-78: LGTM!The initialization logic correctly creates the context, registers it with the plugin instance, and enables HTTP debug callbacks.
278-306: LGTM!The payload composition logic correctly transforms msgpack to JSON using the configured format and date key, with proper error handling.
308-350: LGTM!The flush callback correctly composes the payload, generates labels, invokes the HTTP PUT, and updates progress metrics. The cleanup of
out_bodyis appropriate sinceflb_sds_destroyhandles NULL safely.
352-358: LGTM!The exit callback correctly destroys the plugin context.
427-445: LGTM!The test formatter callback correctly exercises the payload composition logic for testing.
447-462: LGTM!The plugin descriptor correctly configures the Doris output plugin with networking, optional TLS, and 2 workers for concurrent processing.


Add new doris out plugin.
#9501
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
fluent-bit-docs/pull/1483
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Tests